Spring MVC 处理异步请求(上)- CALLABLE 详解

Spring 在 3.0 之后引入了异步请求的功能,这种请求在服务器会返回 Callable 的对象,或者是 DeferredResult 对象。Callable 和 DeferredResult 会将其中的逻辑交给其他线程进行处理,因此,Spring 在处理耗时较长的请求时就不会出现阻塞的现象,使得其他的请求可以如期地进来。关于异步请求的使用,需要将比较耗时的处理逻辑放入 TaskExecutor 中,这样就可以使该逻辑进入其他线程操作。同时主线程释放资源,使得该线程可以去接收其他的请求。这让我们的 web 应用可以“分身”去即时处理更多的请求。本文是 Spring MVC 文档实践中针对于异步请求实现的准备部分,我们来看看 Callable 的运行原理。

Callable 接口是 jdk 1.5 时新加入的接口类,根据源码注释它是

A task that returns a result and may throw an exceptiion.

因此,Callable 也是一个任务,他可以放入线程池中执行,并且可以返回一个结果。而 Runnable 的接口就不提供返回结果的功能。我们可以从一个 Callable 的实现实例入手,来看 Callable 在线程池中的运行原理。

Callable 对象实例 demo

这部分的代码由于需要直接运行,存在于我们的 spring-doc 工程的 test 路径下。首先我们来实现 Callable 接口,该类用于计算阶乘(x!)的结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class IntegerTask implements Callable<Integer> {
int number;

public IntegerTask(int number) {
this.number = number;
}

@Override
public Integer call() throws InvalidParameterException {
int ret = 1;
for (int i = 1; i <= number; i++) {
ret *= i;
}
return ret;
}
}

之后,我们的测试方法需要初始化 ExecutorService 来运行该对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class CallableTest {

@Test
public void testCallable() {

IntegerTask integerTask = new IntegerTask(5);
ExecutorService executorService = Executors.newSingleThreadExecutor();

Future<Integer> futureTask = executorService.submit(integerTask);
try {
Assert.assertEquals(futureTask.get(5, TimeUnit.SECONDS).intValue(), 120);
} catch (Exception ex) {

}
}
}

在我们运行该测试例之后,可以预见,测试例是通过的。

Callable 及其在线程池中运行原理解析

Callable 接口比较简单,只需要我们实现 call() 方法,即可让你的方法在被扔进线程池中执行,从而不占用你的主线程并且在后面可以调用 call 的结果。

在调试上述测试程序的过程中,我们看到 futureTask 变量在调用栈中是一个 FutureTask 类。而我们后续的结果调用都是通过这个 FutureTask 类,因此,我们可以通过 FutureTask 的解析来搞清楚 Callable 是如何运行的。

FutureTask

FutureTask 构造函数

首先是 FutureTask 的构造函数:

1
2
3
4
5
6
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}

可以看到 FutureTask 类封装了 Callable 实例,让 Callable 实例可以在 FutureTask 的控制下异步运行。其中我们可以发现一个 state 变量,这应当是表现一个 FutureTask 实例当前状态的变量。

FutureTask 的状态

根据 NEW 的定义,我们找到了 FutureTask 的状态定义:

1
2
3
4
5
6
7
8
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;

根据官方文档,可以画出 FutureTask 的一个生命周期,他可以走出如下的花样:

FutureTask 状态变化

可以稍微注意一下 state 的定义后面的 int,经过比较,我们可以通过 int 的大小大致判断 FutureTask 处于哪个状态。

FutureTask 运行

FutureTask 的 run() 方法中执行了其内部 Callable 实例的 call() 方法,并且返回结果,具体代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}

可以看到,在进入 run 方法时,首先要判定 state 是不是 NEW,如果状态爱不是 NEW,那么也没有启动的必要了。之后,还要判定 UNSAFE.compareAndSwapObject 方法是不是成功,这里做的是把当前的线程分配给这个 FutureTask,如果线程分配失败,那么 FutureTask 也是无法启动的。

当通过这两层检验之后,进入 try 的部分,我们还是要判断 Callable 实例的存在性以及 FutureTask 的当前状态。之后,需要初始化一个 result,其在 Callable 实例调用 call() 方法结束后,去获得这个结果(result=c.call())。最后,我们会进入两种结果,一种结果是 NORMAL,这个时候会去处理 result;而另一种结果是 Exceptional,这个时候要去处理 Callable 实例抛出的 Exception。

其实这其中还会有一种状态,便是 Callable 的执行被打断了,我们看到 finally 部分 s >= INTERRUPTING 后会去处理被打断的逻辑。这三种状态结果后面都会讲到。当前阶段,我们只需要知道 run() 方法中包含了 FutureTask 从 NEW 到后续的状态的演变,而针对这几种状态,FutureTask 都有对应的方法进行“善后处理”。但是,无论哪个状态,FutureTask 最后都是要把 runner 置为 null。

FutureTask 善后工作

刚才看到,当 FutureTask 调用 callable 的 call() 方法之后,无论 call() 中是完成了功能还是抛出 exception,都会调用对应的善后方法:set(result)/ setException(ex),首先看一下源码:

1
2
3
4
5
6
7
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
1
2
3
4
5
6
7
protected void setException(Throwable t) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t;
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
finishCompletion();
}
}

可以看到,set 类方法的逻辑很简单,首先把结果(成功执行的结果 v 或者错误的 Throwable 对象)赋值给 outcome,之后将 FutureTask 的状态设置为 NORMAL/EXCEPTIONAL;最后,这两个方法都需要执行一个 finishCompletion() 方法,我们再来看看 finishCompletion() 做了什么。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}

done();

callable = null; // to reduce footprint
}

finishCompletion 主要是释放这个占有的线程,将自己的 Callable 置为 null,是一系列收尾工作。

FutureTask 获取执行结果

1
2
3
4
5
6
7
8
9
10
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}

从之前我们的 demo 中也可以看到,FutureTask 通过 get() 方法来获得最终的执行结果,而上面就是 get 方法的源码。主要内容是等待 FutureTask 的 state 到达 COMPLETING 的时候,返回最终的结果 outcome。该部分重要的逻辑在 awaitDone() 方法中。awaitDone 的源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}

int s = state;
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
else if (q == null)
q = new WaitNode();
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
LockSupport.park(this);
}
}

该方法的逻辑如下:

  • 检查方法调用时是否支持了阻塞模式,如果是 timed,则会设置一个 deadline,否则默认值为 0,不支持阻塞
  • 开启死循环模式,检查以下的条件:
    • 检查当前线程是否被 interrupted
      • 如果是,则抛出 InterruptedException
      • 如果不是,则继续执行后续循环。
    • 检查 s 是否处于 COMPLETING 之后(s > COMPLETING),表示该 FutureTask 已经处于任意一个可以退出的状态,则将该线程置 null,退出
    • 如果 s 还是在 COMPLETING,则调用 Thread.yield,这里只是一个信号表示处理器可以拿走这个线程的资源让它等待;当然,也可以继续让这个线程执行下去。
    • 后续检查这个 FutureTask 是否已经被处理器纳入 waitNode,如果没有,则创建 waitNode 对象,将其加入到 waiters 的队列中等待资源的分配。
    • 最后,判断是否支持阻塞,如果已经超时,则退出当前结果;如果未超时,则持续阻塞,直到下次被唤醒。

当 awaitDone 方法结束后出来之后,将会使用 report() 返回最终结果。

FutureTask 的取消

FutureTask 由于是分线程执行的,因此可以像其他线程任务一样可以被 Interrupted,而在 FutureTask 中使用的则是 cancel() 方法,具体代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public boolean cancel(boolean mayInterruptIfRunning) {
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}

Cancel 方法有个输入,表示取消方法是否可以在 FutureTask 在执行过程中被打断。之后,打断该线程的执行,并且执行扫尾工作。

总结

本文作为 Spring MVC 处理异步请求的准备篇,基于 Callable 的 demo 讲解了 Callable 的执行原理,背后其实是 FutureTask 在封装 Callable 的一些执行逻辑,为下一篇介绍 Spring MVC 返回 Callable 处理异步请求做准备。